package com.isyscore.os.metadata.kettle.step;

import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.kettle.base.AbstractStep;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.vis.TableInputNode;
import com.isyscore.os.metadata.kettle.vis.VisNode;
import com.isyscore.os.metadata.utils.StringEscapeHelper;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.step.errorhandling.StreamInterface;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

import java.util.List;


public class TableInputStep extends AbstractStep {

    @Override
    public StepMeta decode(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        TableInputNode node = (TableInputNode)step;
        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp = registry.findPluginWithId( StepPluginType.class, KettleDataFlowNodeType.TableInput.name() );
        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass( sp );

        TableInputMeta tableInputMeta = (TableInputMeta) stepMetaInterface;
        tableInputMeta.setDataBaseName(node.getDatabaseName());
        tableInputMeta.setDatabaseMeta(DatabaseMeta.findDatabase(transMeta.getDatabases(), node.getDataSourceId() + "-" + node.getDatabaseName()));
        tableInputMeta.setSQL(StringEscapeHelper.decode(node.getSql().replaceAll("%(?![0-9a-fA-F]{2})", "%25").replaceAll("\\+", "%2B")));
        tableInputMeta.setRowLimit("0");

        tableInputMeta.setExecuteEachInputRow(false);
        tableInputMeta.setVariableReplacementActive(true);
        tableInputMeta.setLazyConversionActive(false);

        StepMeta stepMeta = new StepMeta(KettleDataFlowNodeType.TableInput.name(), node.getNodeName(), stepMetaInterface);
        stepMeta.setDraw(true);
//        transMeta.addStep(stepMeta);
        return stepMeta;
    }

    @Override
    public StepMeta after(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        //处理表输入前还有表输入的情况，默认取第一个为输入。后续版本可以通过页面选择的方式指定
            StepMeta stepMeta = getStep(transMeta,step.getNodeName());
            TableInputMeta tableInputMeta = (TableInputMeta)stepMeta.getStepMetaInterface();
            if(transMeta.getPrevStepNames(stepMeta.getName()).length>0){
                StreamInterface infoStream = tableInputMeta.getStepIOMeta().getInfoStreams().get(0);
                infoStream.setSubject(transMeta.getPrevStepNames(stepMeta.getName())[0]); //默认选第一个作为输入
            }

        return null;
    }
}
